-
-
Notifications
You must be signed in to change notification settings - Fork 30.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gh-124309: Modernize the staggered_race
implementation to support eager task factories
#124390
gh-124309: Modernize the staggered_race
implementation to support eager task factories
#124390
Conversation
ZeroIntensity
commented
Sep 23, 2024
•
edited by bedevere-app
bot
Loading
edited by bedevere-app
bot
- Issue: Happy Eyeballs crashes with Eager Task Factory #124309
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're using private methods of TaskGroup and starting tasks on the loop rather than the TaskGroup
I think I'm just going to refactor this to not use |
I think it's worth persevering with TaskGroup, you just need to write it without using add_done_callback or private attributes |
I'll try it, but I'm worried that it isn't possible when considering an eager task factory. The previous implementation used a variation of a task group (a list containing tasks, since it predated While we're here, |
A demo of what I mean wrt TaskGroup: """Support for running coroutines in parallel with staggered start times."""
__all__ = 'staggered_race',
from . import locks
from . import tasks
from . import taskgroups
async def staggered_race(coro_fns, delay, *, loop=None):
"""Run coroutines with staggered start times and take the first to finish.
This method takes an iterable of coroutine functions. The first one is
started immediately. From then on, whenever the immediately preceding one
fails (raises an exception), or when *delay* seconds has passed, the next
coroutine is started. This continues until one of the coroutines complete
successfully, in which case all others are cancelled, or until all
coroutines fail.
The coroutines provided should be well-behaved in the following way:
* They should only ``return`` if completed successfully.
* They should always raise an exception if they did not complete
successfully. In particular, if they handle cancellation, they should
probably reraise, like this::
try:
# do work
except asyncio.CancelledError:
# undo partially completed work
raise
Args:
coro_fns: an iterable of coroutine functions, i.e. callables that
return a coroutine object when called. Use ``functools.partial`` or
lambdas to pass arguments.
delay: amount of time, in seconds, between starting coroutines. If
``None``, the coroutines will run sequentially.
loop: the event loop to use.
Returns:
tuple *(winner_result, winner_index, exceptions)* where
- *winner_result*: the result of the winning coroutine, or ``None``
if no coroutines won.
- *winner_index*: the index of the winning coroutine in
``coro_fns``, or ``None`` if no coroutines won. If the winning
coroutine may return None on success, *winner_index* can be used
to definitively determine whether any coroutine won.
- *exceptions*: list of exceptions returned by the coroutines.
``len(exceptions)`` is equal to the number of coroutines actually
started, and the order is the same as in ``coro_fns``. The winning
coroutine's entry is ``None``.
"""
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
winner_result = None
winner_index = None
exceptions = []
class _Done(Exception):
pass
async def run_one_coro(this_index, coro_fn, this_failed):
try:
result = await coro_fn()
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
exceptions[this_index] = e
this_failed.set() # Kickstart the next coroutine
else:
# Store winner's results
nonlocal winner_index, winner_result
# There could be more than one winner
winner_index = this_index
winner_result = result
raise _Done
try:
async with taskgroups.TaskGroup() as tg:
for this_index, coro_fn in enumerate(coro_fns):
this_failed = locks.Event()
exceptions.append(None)
tg.create_task(run_one_coro(this_index, coro_fn, this_failed))
try:
await tasks.wait_for(this_failed.wait(), delay)
except TimeoutError:
pass
except* _Done:
pass
return winner_result, winner_index, exceptions |
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Misc/NEWS.d/next/Library/2024-09-23-18-18-23.gh-issue-124309.iFcarA.rst
Outdated
Show resolved
Hide resolved
Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZeroIntensity.
Co-authored-by: Carol Willing <carolcode@willingconsulting.com>
Thanks @ZeroIntensity for the PR, and @kumaraditya303 for merging it 🌮🎉.. I'm working now to backport this PR to: 3.12, 3.13. |
…port eager task factories (pythonGH-124390) (cherry picked from commit de929f3) Co-authored-by: Peter Bierma <zintensitydev@gmail.com> Co-authored-by: Thomas Grainger <tagrain@gmail.com> Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com> Co-authored-by: Carol Willing <carolcode@willingconsulting.com> Co-authored-by: Kumar Aditya <kumaraditya@python.org>
Sorry, @ZeroIntensity and @kumaraditya303, I could not cleanly backport this to
|
GH-124573 is a backport of this pull request to the 3.13 branch. |
GH-124574 is a backport of this pull request to the 3.12 branch. |
…pport e… (#124574) gh-124309: Modernize the `staggered_race` implementation to support eager task factories (#124390) Co-authored-by: Thomas Grainger <tagrain@gmail.com> Co-authored-by: Jelle Zijlstra <jelle.zijlstra@gmail.com> Co-authored-by: Carol Willing <carolcode@willingconsulting.com> Co-authored-by: Kumar Aditya <kumaraditya@python.org> (cherry picked from commit de929f3) Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
…n to support eager task factories (python#124390)" This reverts commit de929f3.
…wnstream (pythonGH-124810) * Revert "pythonGH-124639: add back loop param to staggered_race (pythonGH-124700)" This reverts commit e0a41a5. * Revert "pythongh-124309: Modernize the `staggered_race` implementation to support eager task factories (pythonGH-124390)" This reverts commit de929f3. (cherry picked from commit 133e929) Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
…ownstream (GH-124810) (#124817) gh-124309: Revert eager task factory fix to prevent breaking downstream (GH-124810) * Revert "GH-124639: add back loop param to staggered_race (GH-124700)" This reverts commit e0a41a5. * Revert "gh-124309: Modernize the `staggered_race` implementation to support eager task factories (GH-124390)" This reverts commit de929f3. (cherry picked from commit 133e929) Co-authored-by: Peter Bierma <zintensitydev@gmail.com>